Skip to content

Conversation

@MasterPtato
Copy link
Contributor

No description provided.

@vercel
Copy link

vercel bot commented Nov 25, 2025

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Preview Comments Updated (UTC)
rivetkit-serverless Ready Ready Preview Comment Nov 27, 2025 0:59am
3 Skipped Deployments
Project Deployment Preview Comments Updated (UTC)
rivet-cloud Ignored Ignored Nov 27, 2025 0:59am
rivet-inspector Ignored Ignored Preview Nov 27, 2025 0:59am
rivet-site Ignored Ignored Preview Nov 27, 2025 0:59am

Copy link
Contributor Author

MasterPtato commented Nov 25, 2025

Warning

This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
Learn more


How to use the Graphite Merge Queue

Add the label merge-queue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Nov 25, 2025

PR Review: Implement events and commands for actor workflows

I've reviewed this PR which implements MK2 protocol support for actor workflows. Here are my findings:

Strengths

  1. Well-structured protocol versioning with clear upgrade/downgrade paths
  2. Robust event demuxing with batching and proper cleanup
  3. Careful initialization sequence that handles missed commands
  4. Good per-runner state management with separate event/command indices

Issues & Concerns

1. Channel abort TODO (actor_event_demuxer.rs:81) - HIGH PRIORITY
The TODO about aborting tasks needs addressing. Aborting may lose in-flight events. Consider graceful shutdown or document why abort is safe.

2. Metadata parsing (conn.rs:285-288, 299-303) - MEDIUM PRIORITY
Silent failures via unwrap_or_default() may hide malformed metadata. Add logging for parse failures.

3. Event deduplication (workflows/actor/mod.rs:466-477)
Consider edge cases: What if events arrive out of order? Risk of loss if runner crashes mid-send?

4. Runner state cleanup (workflows/actor/mod.rs:623-625)
Ensure no race condition where events from removed runner recreate state entry.

5. Missing command ack tracking
Event acks are batched but no similar tracking for command acks. Intentional?

Security

  • Actor ownership validation: Good ✓
  • Generation checking: Proper filtering ✓

Performance

  • Good event batching with 1024 buffer
  • Proper transaction isolation levels
  • Consider: Loop processes up to 1024 signals - may need periodic yielding

Test Coverage

Consider adding:

  • Event deduplication tests
  • Missed command recovery tests
  • Multi-runner generation change tests

Recommendations

  1. HIGH: Address abort TODO or document safety
  2. MEDIUM: Add metadata parse failure logging
  3. MEDIUM: Add reconnection integration tests

Overall

Solid implementation with good architecture. Main concern is the task abort TODO.

Recommendation: Approve with minor changes

@MasterPtato MasterPtato force-pushed the 11-24-fix_pb_implement_events_and_commands_for_actor_wfs branch from 88e1336 to 52845b6 Compare November 26, 2025 02:16
@MasterPtato MasterPtato force-pushed the 11-21-fix_make_all_uses_of_protocol_handle_v4_-_v3 branch from 7ddbf54 to 72a5b39 Compare November 26, 2025 02:16
@MasterPtato MasterPtato force-pushed the 11-24-fix_pb_implement_events_and_commands_for_actor_wfs branch from 52845b6 to 0d4625a Compare November 26, 2025 02:18
@pkg-pr-new
Copy link

pkg-pr-new bot commented Nov 26, 2025

More templates

@rivetkit/cloudflare-workers

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/cloudflare-workers@3543

@rivetkit/db

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/db@3543

@rivetkit/framework-base

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/framework-base@3543

@rivetkit/next-js

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/next-js@3543

@rivetkit/react

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/react@3543

rivetkit

pnpm add https://pkg.pr.new/rivet-dev/rivet/rivetkit@3543

@rivetkit/sql-loader

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/sql-loader@3543

@rivetkit/engine-runner

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner@3543

@rivetkit/engine-runner-protocol

pnpm add https://pkg.pr.new/rivet-dev/rivet/@rivetkit/engine-runner-protocol@3543

commit: 02023f8

@claude
Copy link

claude bot commented Nov 26, 2025

Pull Request Review: fix(pb): implement events and commands for actor wfs

Overview

This PR implements events and commands handling for actor workflows in the Pegboard system, specifically adding support for the mk2 protocol version. The changes span across multiple packages including pegboard-runner, pegboard, pegboard-gateway, and related infrastructure.

Code Quality & Architecture

Strengths:

  1. Good abstraction for protocol versions via the Init enum in conn.rs (lines 203-255)
  2. Event batching properly reduces overhead using EVENT_ACK_BATCH_SIZE = 250
  3. Proper state management with RunnerState tracking for multiple runners
  4. Database transaction isolation with custom instrumentation

Potential Bugs & Issues

  1. Missing error handling in metadata parsing (conn.rs:285-288, 299-303): Using unwrap_or_default() silently swallows parsing errors. Consider logging when metadata parsing fails.

  2. Unchecked index arithmetic (actor/mod.rs:468-469): The filtering assumes monotonically increasing indices. If the runner sends duplicate or out-of-order events, this could cause issues.

  3. Potential race condition (conn.rs:88-117): Between checking if a runner exists/is expired and dispatching the workflow, another connection could modify the runner state. Consider documenting this.

  4. Missing protocol version validation (pegboard-gateway/lib.rs:164-174): The code defaults to PROTOCOL_MK1_VERSION when no version is stored, but this could cause issues if a runner reconnects with a different version.

@claude
Copy link

claude bot commented Nov 26, 2025

Performance Considerations

  1. Database read amplification (conn.rs:324-356): Reading all missed commands during init could be expensive for runners offline for long periods. Consider pagination, command expiration, and metrics.

  2. Iterator chaining efficiency (keys/runner.rs:804-806): Nested iterators could be optimized using flat_map.

  3. Event filtering on every signal (actor/mod.rs:464-473): For high-frequency events, consider maintaining a cursor or checkpointing more aggressively.

Security Concerns

  1. No metadata size validation (conn.rs:297-316): A malicious runner could send excessive metadata. Add size validation before chunking.

  2. Command replay vulnerability (conn.rs:323-356): Missed commands are replayed without verifying relevance. Consider adding command expiration.

Test Coverage

Missing test scenarios:

  • No tests for handle_init with prepopulated actor names
  • Missing event deduplication logic tests
  • No metadata chunking/splitting tests
  • Missing protocol version negotiation edge case tests

Database Schema Concerns

Command persistence: Commands are persisted but there is no cleanup logic. This could lead to unbounded growth over time. Add a cleanup mechanism for acknowledged/expired commands.

@claude
Copy link

claude bot commented Nov 26, 2025

Style & Convention Issues

  1. Import ordering (conn.rs:7-8): Should combine duplicate futures_util imports per CLAUDE.md conventions
  2. Formatting inconsistencies: Auto-formatting changes should be in a separate commit

Recommendations

High Priority:

  1. Add size validation for metadata to prevent abuse
  2. Implement command cleanup/expiration logic
  3. Add logging for metadata parsing errors
  4. Add tests for the new event handling logic

Medium Priority:

  1. Document the protocol version fallback behavior
  2. Add metrics for missed command counts and event batch sizes
  3. Optimize iterator chaining in metadata key combining
  4. Consider pagination for missed commands

Low Priority:

  1. Separate formatting changes into their own commit
  2. Combine duplicate imports
  3. Document the EVENT_ACK_BATCH_SIZE constant

Overall Assessment

This is a solid implementation that handles complex protocol versioning and event management. The code is generally well-structured with proper error handling. However, there are areas that need attention around data validation, performance at scale, and test coverage. The main concerns are unbounded growth (metadata size, command storage) and potential edge cases in event deduplication.

Recommendation: Request changes to address high-priority issues, particularly around data validation and resource management.

@claude
Copy link

claude bot commented Nov 26, 2025

PR Review: Implement Events and Commands for Actor Workflows

This PR implements a significant refactoring of the actor lifecycle management system to use persistent events and commands for mk2 protocol runners. Here's my comprehensive review:

✅ Strengths

  1. Protocol Versioning: Good backward compatibility with mk1 protocol while introducing mk2 improvements

    • Clean separation between mk1 and mk2 handling in conn.rs:181-198
    • Proper fallback to PROTOCOL_MK1_VERSION when version is missing
  2. Event Batching: Efficient event acknowledgment with batching (lines actor/mod.rs:586-606)

    • Uses EVENT_ACK_BATCH_SIZE = 250 to reduce message overhead
    • Only sends acks when threshold is exceeded
  3. State Management: Well-structured state tracking per runner

    • RunnerState tracks both last_event_idx and last_event_ack_idx
    • LifecycleRunnerState maintains proper event sequencing
  4. Actor Event Demuxer: Clean separation of concerns

    • Batches events efficiently (actor_event_demuxer.rs:52)
    • Includes proper garbage collection (actor_event_demuxer.rs:76-89)

⚠️ Issues & Concerns

Critical

  1. Missing Error Handling in Connection Init (conn.rs:70)
    If init parsing fails, the connection is dropped without sending a proper error response to the runner. Consider sending an error message before closing.

  2. Silent Event Send Failures (actor_event_demuxer.rs:41, 63)
    Events are silently dropped if the channel is full or closed with underscore pattern. This could lead to event loss without any warning. Consider:

    • Logging when sends fail
    • Implementing backpressure or flow control
    • Alerting when events are dropped
  3. Unsafe Abort in GC (actor_event_demuxer.rs:84)
    There's a TODO comment indicating uncertainty about task abortion safety. Aborting tasks can leave state inconsistent. Consider:

    • Sending a shutdown signal to the task instead
    • Waiting for graceful completion with a timeout
    • Documenting why abort is safe if it is
  4. Race Condition in Protocol Version (pegboard-gateway/lib.rs:165-176)
    The protocol version is read from UDB but uses read_opt with a fallback. If a runner reconnects between mk1 and mk2, there's a small window where the version might be stale or missing.

Medium Priority

  1. Transaction Isolation Inconsistency (runtime.rs:244)
    Mixing Snapshot and Serializable reads in the same transaction could lead to subtle consistency issues. The comment explains the intent but the safety should be verified.

  2. Unbounded Command Index (runtime.rs:1105)
    The command index grows unboundedly (last_command_idx += input.commands.len()). Consider:

    • What happens when it overflows i64::MAX?
    • Implementing command cleanup/archival
  3. Metadata Deserialization (conn.rs:285-288, 300-303)
    Invalid JSON in metadata silently becomes empty with unwrap_or_default(). Consider logging when deserialization fails for debugging.

  4. Missing Context in Trace (actor_event_demuxer.rs:38)
    Good logging added, but the ingest method isn't instrumented. Consider adding tracing::instrument for better traceability.

Low Priority

  1. Code Formatting: The PR includes unrelated formatting changes in api-builder/middleware.rs:189. This is minor but makes the diff harder to review.

  2. Commented-Out Code: Comment formatting change in metrics/buckets.rs:5 seems unnecessary.

📊 Performance Considerations

  1. Event Batching: Good use of batching in actor_event_demuxer.rs:52 with buffer size 1024
  2. Database Queries: The StreamingMode::WantAll in conn.rs:326 could be expensive if there are many missed commands. Consider pagination.
  3. Command Storage: Commands are stored indefinitely in UDB. Consider implementing cleanup based on acknowledgment.

🔒 Security Considerations

  1. Metadata Injection: The prepopulate_actor_names and metadata fields allow runners to write arbitrary data to UDB. Ensure proper validation exists upstream.
  2. Command Replay: Missed commands are replayed on reconnect (conn.rs:377-385). Verify that commands are idempotent to prevent double-execution issues.

🧪 Testing Recommendations

  1. Add tests for event deduplication logic (actor/mod.rs:466-470)
  2. Test command index overflow scenarios
  3. Test event loss scenarios (what happens if demuxer drops events?)
  4. Test mk1 ↔ mk2 protocol migration
  5. Test runner reconnection with missed commands

📝 Documentation Gaps

  1. The new ActorCommandKey structure needs documentation explaining the indexing scheme
  2. The relationship between last_event_idx, last_event_ack_idx, and last_command_idx should be documented
  3. Add comments explaining why certain transactions use Snapshot vs Serializable

✨ Suggestions

  1. Extract Constants: Magic numbers like 1024 for batch sizes should be constants
  2. Metrics: Add metrics for:
    • Events dropped by demuxer
    • Command replay counts
    • Event processing latency
  3. Graceful Degradation: Consider what happens when UDB is slow/unavailable during init

Overall Assessment

This is a substantial and well-architected change that modernizes the actor lifecycle system. The core logic is sound, but there are several areas where error handling and edge cases need attention. The critical issues around silent event dropping and task abortion should be addressed before merging.

Recommendation: Request changes to address critical issues, then approve after fixes.

Files needing attention:

  • engine/packages/pegboard-runner/src/actor_event_demuxer.rs
  • engine/packages/pegboard-runner/src/conn.rs
  • engine/packages/pegboard/src/workflows/actor/runtime.rs

@claude
Copy link

claude bot commented Nov 26, 2025

PR Review: fix(pb): implement events and commands for actor wfs

I've completed a comprehensive review of this PR. This is a significant refactor implementing mk2 protocol support for actor workflows. Overall, the implementation is solid, but I've identified several areas that need attention.

Critical Issues

1. Missing Error Propagation (pegboard-gateway/src/lib.rs:210-213)
The result from send().await? is now being checked, but there's no error handling if the workflow isn't found. Add proper error handling or document why graceful_not_found() is acceptable.

2. Unsafe unwrap_or_default() in Protocol Version (pegboard-gateway/src/lib.rs:172-173)
Silently defaults to MK1 protocol if version isn't found. Log when defaulting to MK1 or consider failing for critical operations.

3. Race Condition in Init Handler (pegboard-runner/src/conn.rs:257-272)
Signal is sent before database writes complete. Workflow may not see prepopulated actor names or metadata. Complete DB writes before sending signal or add synchronization.

4. Silent JSON Parse Failures (pegboard-runner/src/conn.rs:285-288, 299-303)
JSON parsing errors are silently ignored with empty defaults. Add logging for parse failures.

High Priority Issues

5. TODO Comments Need Resolution
Multiple TODOs in pegboard-runner/src/ws_to_tunnel_task.rs - KV operations may be blocking the WebSocket receive loop. Create follow-up issues or resolve in this PR.

6. Missing Tracing (pegboard/src/workflows/actor/runtime.rs:1096-1163)
insert_and_send_commands activity needs structured logging for debugging.

7. HACK Comment (pegboard/src/workflows/actor/runtime.rs:573-574)
Using util::timestamp::now() for create_ts breaks workflow determinism. Use ctx.create_ts() instead.

Medium Priority Issues

8. Unbounded Channels (pegboard-runner/src/actor_event_demuxer.rs:29)
Could cause memory issues. Consider bounded channels with backpressure.

9. Hardcoded EVENT_ACK_BATCH_SIZE
Should this be configurable per deployment?

10. Incomplete Runner State Cleanup
State could accumulate if check_runners fails. Add periodic cleanup or document retention policy.

Positive Observations

  • Excellent protocol abstraction with Init enum
  • Robust error context throughout
  • Proper database subspace management
  • Good structured logging (with room for improvement)
  • Careful transaction design with isolation levels

Test Coverage

Missing tests for:

  • Event demuxer
  • Command insertion and acknowledgment
  • Protocol version fallback

Summary

Overall: Approve with Changes

Priority actions before merge:

  1. Fix init handler race condition
  2. Add error logging for JSON parse failures
  3. Resolve TODO comments or create follow-up issues
  4. Fix HACK with dynamic timestamps

Follow-up issues to create:

  • KV operation performance optimization
  • Test coverage for event/command system
  • Error message sanitization

Great work on this complex refactor!

@claude
Copy link

claude bot commented Nov 26, 2025

Code Review: Events and Commands Implementation for Actor Workflows

I've reviewed this PR which implements a significant architectural improvement to how actor workflows handle events and commands. Here's my detailed feedback:


🎯 Overall Assessment

This is a well-structured PR that introduces important improvements to the actor workflow system, particularly around:

  • Persistent command storage in the database
  • Event acknowledgment batching for better performance
  • Protocol version handling and backward compatibility
  • Better separation between mk1 and mk2 protocol handling

Overall Quality: Good ✅


💡 Strengths

  1. Robust Database Design (engine/packages/pegboard/src/keys/runner.rs)

    • Well-designed key structures for actor commands with proper indexing
    • Good use of chunking for metadata storage (prevents hitting size limits)
    • Proper use of formal keys with serialization/deserialization
  2. Event Batching (engine/packages/pegboard/src/workflows/actor/mod.rs:587-607)

    • Efficient event acknowledgment batching (EVENT_ACK_BATCH_SIZE = 250)
    • Only sends acks when threshold is exceeded, reducing network overhead
  3. Backward Compatibility

    • Proper fallback to PROTOCOL_MK1_VERSION when protocol version is not found
    • Both mk1 and mk2 paths maintained throughout the codebase
    • Good separation of concerns between protocol versions
  4. Error Handling

    • Comprehensive use of context() for error propagation
    • Graceful degradation (e.g., .graceful_not_found() calls)
    • Minimal use of .unwrap() - only .unwrap_or_default() which is safe
  5. Concurrency Safety (engine/packages/pegboard-runner/src/actor_event_demuxer.rs)

    • Good use of per-actor channels to avoid contention
    • Proper cleanup with GC of inactive channels
    • Graceful shutdown handling

⚠️ Issues & Recommendations

Critical Issues

1. JSON Deserialization with Silent Failures (conn.rs:285-288, 299-303)

let metadata = serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&data.metadata)
    .unwrap_or_default();

Issue: If JSON parsing fails, the error is silently ignored and defaults to an empty map. This could mask data corruption or protocol issues.

Recommendation:

let metadata = match serde_json::from_str::<serde_json::Map<String, serde_json::Value>>(&data.metadata) {
    Ok(m) => m,
    Err(err) => {
        tracing::warn!(?err, metadata=?data.metadata, "failed to parse actor metadata, using empty map");
        Default::default()
    }
};

2. Race Condition Risk in Event Processing (actor/mod.rs:459-471)

let last_event_idx = state.runner_states.entry(runner_id).or_default().last_event_idx;

let new_events = sig.events
    .iter()
    .filter(|event| {
        event.checkpoint.index > last_event_idx &&
        crate::utils::event_generation(&event.inner) == generation
    });

Issue: Events are filtered by last_event_idx, but if the same runner sends multiple batches in quick succession and the workflow processes them out of order, events could be dropped or duplicated.

Recommendation: Consider adding sequence validation or logging when events arrive out of order to detect this scenario early.


Medium Priority Issues

3. Missing Transaction Conflict Handling

In runtime.rs:243-248, the code uses Snapshot isolation to avoid conflicts when checking the queue:

// NOTE: This is not Serializable because we don't want to conflict with other
// inserts/clears to this range
Snapshot,

Concern: While the comment explains the intent, using Snapshot here means the transaction could miss concurrent inserts. This appears intentional, but there's a window where an actor could be added to the queue between the check and the runner allocation scan.

Recommendation: Add a test case to verify this behavior under concurrent load, or add additional validation after the transaction commits.

4. Potential Memory Leak in Event Demuxer (actor_event_demuxer.rs:76-89)

if self.last_gc.elapsed() > GC_INTERVAL {
    self.last_gc = Instant::now();
    
    self.channels.retain(|_, channel| {
        let keep = channel.last_seen.elapsed() < MAX_LAST_SEEN;
        
        if !keep {
            // TODO: Verify aborting is safe here
            channel.handle.abort();
        }
        
        keep
    });
}

Issues:

  • Channel last_seen is never updated in the current implementation, so channels are only removed after 30s of no activity
  • The TODO comment indicates uncertainty about aborting being safe
  • If events continue flowing to an actor, its channel will never be cleaned up

Recommendation:

  1. Update last_seen when ingesting events: channel.last_seen = Instant::now(); after sending
  2. Consider a more graceful shutdown than .abort(), or document why abort is safe
  3. Add metrics to monitor channel count growth

5. Missing Error Context in Handle Init (conn.rs:262-272)

The signal is sent before database operations, which is good for fail-fast, but errors from the database transaction don't include context about which part failed.

Recommendation: Add .context() or wrap sections of the transaction with span instrumentation to help debugging.


Minor Issues & Code Quality

6. Inconsistent Logging Levels (conn.rs:288, 303)

Metadata parsing failures should probably be logged as warnings since they represent a protocol/data issue.

7. Code Duplication in spawn_actor (runtime.rs:556-820)

The mk1 vs mk2 command sending logic is duplicated in three places. Consider extracting to a helper function.

8. Magic Numbers

  • EVENT_ACK_BATCH_SIZE = 250 - Consider making this configurable or document the rationale
  • GC_INTERVAL = 30s, MAX_LAST_SEEN = 30s - Consider making these configurable

9. HACK Comments (runtime.rs:573-575, 597-599)

// HACK: We should not use dynamic timestamp here, but we don't validate if signal data
// changes (like activity inputs) so this is fine for now.
create_ts: util::timestamp::now(),

Recommendation: File a follow-up issue to address this properly (workflow determinism).


🔒 Security Considerations

1. Actor Ownership Validation (ws_to_tunnel_task.rs:183-213)

✅ Good validation that actors belong to the correct runner before processing KV requests. This prevents cross-runner data access.

2. TODO: Error Message Exposure (ws_to_tunnel_task.rs:237, 284, etc.)

Multiple TODOs note: // TODO: Don't return actual error?

Recommendation: Sanitize error messages before sending to runners to avoid leaking internal details. This should be addressed before production use.


🧪 Testing

Missing Test Coverage:

  1. Event ordering edge cases (out-of-order delivery, duplicates)
  2. Concurrent actor allocation with queue insertion
  3. Event demuxer GC behavior
  4. Command acknowledgment with missed commands replay
  5. Protocol version migration scenarios (mk1→mk2 upgrades)

Recommendation: Add integration tests for the event/command flow, especially around the persistence and replay logic.


📊 Performance Considerations

Positive:

  • Event batching reduces signal overhead significantly
  • Actor event demuxer prevents head-of-line blocking
  • Proper use of database transactions with appropriate isolation levels

Concerns:

  • missed_commands query in handle_init scans entire command subspace - could be slow for runners with many actors. Consider pagination or a limit.
  • KV operations are synchronous in the WS handler - the TODO mentions adding a queue, which would be beneficial under load

📝 Documentation

Good:

  • Comments explaining transaction isolation choices
  • Workflow step documentation in actor creation sequence
  • Clear naming of keys and structures

Could Improve:

  • Document the event/command flow lifecycle
  • Add module-level documentation for the event demuxer
  • Document retry/replay semantics for commands

Adherence to Repository Guidelines

Per CLAUDE.md:

  • ✅ Uses custom error types with RivetError derive
  • ✅ Proper structured logging (tracing with ?field syntax)
  • ✅ Lowercase log messages
  • ✅ UUID for IDs, i64 timestamps in milliseconds
  • ✅ Timestamp naming follows *_at convention (though some use *_ts)
  • ✅ Workspace dependencies used correctly
  • ⚠️ A few instances don't follow the "imports at top" guideline (inline imports acceptable but check consistency)

🎬 Conclusion

This is a solid PR that implements a critical piece of infrastructure. The code quality is generally high with good error handling and thoughtful design.

Blocking Issues: None
High Priority: Address JSON parsing silent failures, verify event ordering guarantees
Medium Priority: Event demuxer improvements, test coverage
Low Priority: Code deduplication, configuration for magic numbers

Recommendation: Approve with minor changes requested

The critical path code is sound, but I'd like to see the JSON parsing improved and some consideration of the event ordering edge cases before merge. The other issues can be addressed in follow-up PRs.


Great work on this complex refactoring! The architecture is well-thought-out and the implementation is thorough. 🚀

@claude
Copy link

claude bot commented Nov 27, 2025

Code Review - PR #3543: fix(pb): implement events and commands for actor wfs

Overview

This PR implements a significant architectural improvement to the Pegboard actor system by introducing proper event and command handling for actor workflows, particularly for the mk2 runner protocol. The changes refactor how actors communicate with runners, moving from signal-based communication to a more robust event-driven model with persistent command storage.

Summary of Changes

Core Changes:

  • Actor Workflow (pegboard/src/workflows/actor/mod.rs): Added support for handling batched events from mk2 runners, including event deduplication and acknowledgment batching
  • Actor Runtime (pegboard/src/workflows/actor/runtime.rs): New activity InsertAndSendCommands for persisting commands to UniversalDB before sending them to runners
  • Pegboard Runner Connection (pegboard-runner/src/conn.rs): Refactored initialization to handle mk2 protocol with prepopulated actor names, metadata storage, and missed command recovery
  • Actor Event Demuxer (pegboard-runner/src/actor_event_demuxer.rs): Enhanced to dispatch events with runner_id context for proper routing
  • Protocol Changes: Added new protocol structures for command/event wrappers with checkpoints for reliable delivery

Positive Aspects ✅

  1. Robust Event Tracking: The implementation of per-runner event indices with batch acknowledgment (lines 580-606 in actor/mod.rs) is well-designed and prevents duplicate processing
  2. Backward Compatibility: Excellent handling of both mk1 and mk2 protocols throughout the codebase
  3. Persistent Commands: Storing commands in UniversalDB before sending ensures they can be recovered after runner reconnections
  4. Clean Abstraction: The Init enum pattern (lines 203-255 in conn.rs) provides good protocol version abstraction

Issues & Concerns ⚠️

1. Critical: Missing Error Handling in Guard 🔴

Location: engine/packages/guard/src/routing/pegboard_gateway.rs:213

.await?;

if res.is_none() {

The ? operator was added but the error is not propagated properly. If the signal send fails, this will cause a panic rather than graceful error handling.

Recommendation: Review error handling strategy here - should this fail silently, log and continue, or propagate the error?


2. Data Integrity: Event Generation Filtering Logic 🟡

Location: engine/packages/pegboard/src/workflows/actor/mod.rs:462-469

let new_events = sig.events
    .iter()
    .filter(|event| {
        event.checkpoint.index > last_event_idx &&
        crate::utils::event_generation(&event.inner) == generation
    });

Issue: Events are filtered by generation but there's no logging when events are dropped due to generation mismatch. This could make debugging difficult.

Recommendation: Add tracing when events are filtered out:

let new_events = sig.events
    .iter()
    .filter(|event| {
        let matches = event.checkpoint.index > last_event_idx &&
            crate::utils::event_generation(&event.inner) == generation;
        if !matches {
            tracing::debug!(
                event_idx=?event.checkpoint.index,
                event_gen=?crate::utils::event_generation(&event.inner),
                current_gen=?generation,
                "filtering out event"
            );
        }
        matches
    });

3. Performance: Unbounded Command Storage 🟡

Location: engine/packages/pegboard/src/workflows/actor/runtime.rs:1109-1136

Issue: Commands are written to UniversalDB indefinitely without cleanup. Over time, this could lead to significant storage bloat, especially for long-running actors with many state transitions.

Recommendation: Implement a cleanup mechanism:

  • Add a background task to delete acknowledged commands
  • Or store only the last N commands per actor
  • Document the expected lifecycle of these command records

4. Concurrency: Race Condition in Demuxer 🟡

Location: engine/packages/pegboard-runner/src/actor_event_demuxer.rs:48-65

let _ = tx.send(event);

self.channels.insert(
    actor_id,
    Channel {
        tx,
        handle,
        last_seen: Instant::now(),
    },
);

Issue: The first event is sent before the channel is inserted into the map. If another event arrives for the same actor immediately after, it could create a duplicate channel since the first one isn't in the map yet.

Recommendation: Insert the channel first, then send the event:

self.channels.insert(
    actor_id,
    Channel {
        tx: tx.clone(),
        handle,
        last_seen: Instant::now(),
    },
);

let _ = tx.send(event);

5. Error Handling: Silent Failures in Metadata Parsing 🟡

Location: engine/packages/pegboard-runner/src/conn.rs:285-288 and 299-303

let metadata = serde_json::from_str::<
    serde_json::Map<String, serde_json::Value>,
>(&data.metadata)
.unwrap_or_default();

Issue: Invalid JSON in metadata fields silently falls back to empty objects. This could hide configuration errors.

Recommendation: Log when parsing fails:

let metadata = serde_json::from_str::<
    serde_json::Map<String, serde_json::Value>,
>(&data.metadata)
.map_err(|e| {
    tracing::warn!(?e, name=?name, "failed to parse actor metadata, using empty object");
    e
})
.unwrap_or_default();

6. Logging: Inconsistent Log Levels 🟢

Location: Various files

Issue: Some important state transitions use debug! when they should probably use info!:

  • Line 298 in actor/mod.rs: tracing::debug!(actor_id=?input.actor_id, "actor wake");
  • Line 282 in actor/mod.rs: tracing::warn!(actor_id=?input.actor_id, "actor lost");

Recommendation: Use info! for significant lifecycle events (wake, allocate, deallocate) and reserve debug! for verbose internal state.


7. Code Quality: Formatting Inconsistencies 🟢

Location: Multiple files (e.g., api-builder/src/middleware.rs:189)

Some lines exceed reasonable length limits. While this PR seems to fix some formatting issues, there are still instances of inconsistent formatting.

Note: Per CLAUDE.md, do not run cargo fmt automatically, but be mindful of readability.


Security Considerations 🔒

  1. ✅ Good: Commands are validated through the protocol's deserialization
  2. ✅ Good: Event indices prevent replay attacks
  3. ⚠️ Consider: The prepopulated actor names feature could potentially be abused if not properly validated upstream

Testing Recommendations 🧪

  1. Event Ordering: Add tests to verify events are processed in the correct order even when delivered out of order
  2. Reconnection Scenarios: Test that missed commands are properly recovered after runner reconnection
  3. Concurrent Actor Events: Test the demuxer with rapid concurrent events for the same actor
  4. Command Persistence: Verify commands survive workflow restarts
  5. Generation Transitions: Test edge cases around generation increments

Performance Considerations ⚡

  1. Event Batching: The batch size of 250 (EVENT_ACK_BATCH_SIZE) seems reasonable, but should be tuned based on production metrics
  2. Database Writes: Each InsertAndSendCommands call writes multiple UDB entries. Monitor transaction latency
  3. Memory: The runner_states HashMap could grow unbounded - the cleanup logic in CheckRunners is good but ensure it's called frequently enough

Documentation Needs 📝

  1. Add comments explaining the lifecycle of command records in UDB
  2. Document the event acknowledgment protocol and batch size rationale
  3. Explain the generation matching logic and why events from old generations are dropped
  4. Add architecture diagram showing the event/command flow between actors, workflows, and runners

Summary

This is a solid architectural improvement that adds much-needed reliability to the actor system. The core design is sound, with good separation of concerns and backward compatibility.

Before Merging:

Post-Merge:

  • Add the recommended logging for event filtering
  • Improve metadata parsing error visibility
  • Add comprehensive integration tests for the new event/command system

Overall Assessment: Approve with minor requested changes ✅


Review generated by Claude Code

@claude
Copy link

claude bot commented Nov 27, 2025

PR Review: Implement Events and Commands for Actor Workflows

This PR implements a new event and command system for actor workflows in the pegboard system, adding support for the MK2 protocol version.

Code Quality - Strengths:

  • Good separation of concerns between MK1 and MK2 protocol handling
  • Proper use of structured logging with tracing throughout
  • Consistent error handling patterns following the rivet error system
  • Transaction boundaries are well-defined with custom instrumentation spans

Issues to Address:

1. Critical: Missing Error Handling in conn.rs:288
Line 288 uses unwrap_or_default() which silently hides JSON parsing errors. Recommend logging errors before falling back to default.

2. Potential Race Condition in Event Processing
In actor/mod.rs:461-470, if events arrive out of order, this could skip events. Consider validating that indices are sequential or logging when gaps are detected.

3. Database Transaction Complexity
The handle_init function (conn.rs:257-388) performs multiple operations in a single transaction. Consider breaking into smaller transactions with clear rollback boundaries.

4. Missing Bounds Check
actor/runtime.rs:1105 increments last_command_idx without overflow protection. Consider using checked_add or document the assumption.

Performance Concerns:

1. Command Storage Growth
Commands are stored persistently in the database (keys/runner.rs:924-998) with no apparent cleanup mechanism. This could lead to unbounded growth. Recommend implementing a TTL or cleanup job.

2. Metadata Chunking
Is there a limit on metadata size? Document the expected upper bound to prevent abuse.

3. Event Demuxer Channels
Creates unbounded channels per actor (actor_event_demuxer.rs:26-108). Risk of memory exhaustion if events accumulate faster than processing. Recommend bounded channels with backpressure.

Security Concerns:

1. Base64 Decoding Without Size Limit (actor/runtime.rs:579)
No validation of decoded size could allow memory exhaustion. Add maximum size check.

2. Metadata Injection (conn.rs:300-303)
Arbitrary JSON could be injected. Validate structure and size limits on metadata.

3. Missing Input Validation
The Init enum deserializes user input but doesn't validate runner name format, version bounds, or total slots sanity checks.

Test Coverage:

Missing tests for:

  1. Event ordering and out-of-order handling
  2. Command replay after runner reconnection
  3. Metadata chunking edge cases
  4. Race conditions between allocation and destroy signals
  5. Event acknowledgment batching behavior
  6. Protocol version negotiation

Recommendations Summary:

Must Fix Before Merge:

  • Add input validation for metadata and base64 inputs (security)
  • Document or implement cleanup strategy for command storage (performance)
  • Add bounds checking for command index arithmetic

Should Fix:

  • Improve error handling in JSON parsing
  • Add validation for out-of-order event handling
  • Break down large transactions in handle_init

Nice to Have:

  • Add integration tests for MK2 protocol
  • Document state machine transitions
  • Add size limits to metadata and actor inputs

Conclusion:

This is a substantial and well-structured change that adds important functionality and follows Rivet conventions. However, security and performance concerns around input validation and resource management should be addressed before merging.

Overall Assessment: Approve with requested changes

@MasterPtato MasterPtato force-pushed the 11-21-fix_make_all_uses_of_protocol_handle_v4_-_v3 branch from 360920a to 7837fe9 Compare November 27, 2025 00:58
@MasterPtato MasterPtato force-pushed the 11-24-fix_pb_implement_events_and_commands_for_actor_wfs branch from 8d5a457 to 02023f8 Compare November 27, 2025 00:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants